/**
 * Project Name:KettleEasyExpand
 * Date:2018年5月18日
 * Copyright (c) 2018, jingma All Rights Reserved.
 */

package cn.benma666.kettle.loglistener;

import cn.benma666.constants.UtilConst;
import cn.benma666.domain.SysPtglXtxx;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.BasicObject;
import cn.benma666.iframe.Conf;
import cn.benma666.kettle.domain.VJob;
import cn.benma666.kettle.mytuils.KettleManager;
import cn.benma666.myutils.DateUtil;
import cn.benma666.myutils.StringUtil;
import cn.benma666.sjsj.web.XtxxWebSocket;
import cn.benma666.sjzt.Db;
import com.alibaba.fastjson.JSON;
import org.apache.commons.vfs2.FileObject;
import org.beetl.sql.core.SqlId;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.*;
import org.pentaho.di.core.vfs.KettleVFS;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 定制的文件日志记录监听器 <br/>
 * date: 2018年5月18日 <br/>
 * @author jingma
 * @version 0.1
 */
public class FileLoggingEventListener extends BasicObject implements KettleLoggingEventListener {
    /**
     * 日志预警匹配的正则
     */
    public static Pattern logPatt = Pattern.compile(Conf.getVal("benma666.km.log-warning-pattern","(Exception|Error)"));
    /**
     * 日志管道仓库
     */
    private static final LoggingRegistry lr = LoggingRegistry.getInstance();
    /**
     * 日志对应作业
     */
    private VJob job;
    /**
     * 是否关闭
     */
    private boolean closed = false;
    /**
     * 最后更新时间
     */
    private Date lastupdate = new Date();

    /**
     * Creates a new instance of FileLoggingEventListener.
     */
    public FileLoggingEventListener() {
    }

    /**
     * Creates a new instance of FileLoggingEventListener.
     */
    public FileLoggingEventListener(String logChannelId, File file,
                                    boolean append, VJob job) throws KettleException {
        this(logChannelId, file.getAbsolutePath(), append);
        this.job = job;
        job.setRzwj(file);
    }

    /**
     *
     * @see org.pentaho.di.core.logging.FileLoggingEventListener#eventAdded(org.pentaho.di.core.logging.KettleLoggingEvent)
     */
    @Override
    public void eventAdded(KettleLoggingEvent event) {
        // 日志是否写入
        try {
            Object messageObject = event.getMessage();
            if (!(messageObject instanceof LogMessage)) {
                log.error("不是LogMessage对象：" + JSON.toJSONString(event));
                return;
            }
            LogMessage message = (LogMessage) messageObject;
            String tn = Thread.currentThread().getName();
            if (isBlank(message.getSubject())) {
                message.setSubject(tn);
            }
            VJob job = null;
            if(tn.contains(" - ")){
                job = KettleManager.JobMap.get(tn.substring(0, tn.indexOf(" - ")));
            }
            // 日志没有写入文件
            if (job != null) {
                //推送实时日志
                SysPtglXtxx xtxx = SysPtglXtxx.builder().xxnr(DateUtil.getDateTimeStr()
                        + " - " + message.getSubject()+ " - " + message.getMessage())
                        .mdddl("KETTLE_GLPT_ZYGL").mddxl(job.getKey()).build();
                XtxxWebSocket.sendMsg(xtxx, null);
                myLogDispose(job, message);
                job.getFlel().setLastupdate(new Date());
                if (KettleManager.isWriteLogFile()) {
                    job.getFlel().writeLog(event);
                }
            } else {
                log.info("丢失的kettle日志：" + JSON.toJSONString(event));
            }
        } catch (Exception e) {
            log.error("作业日志处理失败:" + JSON.toJSONString(event), e);
        }
    }

    /**
     * 我的日志处理 ：日志管道时间更新、异常检测<br/>
     * @author jingma
     */
    public void myLogDispose(VJob job, LogMessage message) {
        // 获取日志内容
        String joblogStr = message.getMessage();
        // 日志关键字预警
        Matcher m = logPatt.matcher(joblogStr);
        //没有匹配上且日志级别不是错误
        if (!m.find() && !message.getLevel().isError()) {
            return;
        }
        if (message.getArguments() != null && message.getArguments().length > 0) {
            //得到异常具体消息，使短信中得到跟具体的信息
            if (message.getArguments()[0] instanceof Throwable) {
                joblogStr = ((Throwable) message.getArguments()[0]).getMessage() + Const.CR + joblogStr;
            }
        }
        String msg = getExceptionMsg(joblogStr, m);
        String logLevel = message.getLevel().getLevel() + "";
        int error = StringUtil.whether(message.isError());
        String subject = message.getSubject();
        String logChannel = message.getLogChannelId();
        if (job != null && job.getRzwj() != null) {
            Db.use(job.getZyk()).update(SqlId.of("kee", "insertZyyj"),
                    Db.buildMap(job.getIdJob(), job.getName(), job.getRzwj().getAbsolutePath(),
                            msg, logLevel, error, subject, logChannel,Conf.getAppdm()));
        }
    }

    /**
     * 监测文件大小，进行日志文件更换 <br/>
     * @author jingma
     */
    public static void checkLogFileSize(VJob job) throws KettleException {
        // 日志文件大小判断
        if (job.getRzwj() != null && job.getRzwj().length() >
                KettleManager.getLogFileSize() * 1024 * 1024) {
            // 每个日志文件记录一条作业日志，用户可以根据时间区间选择要下载的日志。
            synchronized (KettleManager.JobMap) {
                close(job);
                addJobLogFile(job);
            }
        }
    }

    /**
     * 关闭作业的日志相关资源 <br/>
     * @author jingma
     */
    public static void close(VJob job) throws KettleException {
        synchronized (KettleManager.JobMap) {
            job.getFlel().close();
            updateJoblog(job);
        }
    }

    /**
     * 添加作业日志文件 <br/>
     * @author jingma
     */
    public static void addJobLogFile(VJob job) throws KettleException {
        synchronized (KettleManager.JobMap) {
            // 记录日志记录的主键，用于更新
            job.setJcrzzj(StringUtil.getUUIDUpperStr());
            job.setZykssj(DateUtil.getGabDate());
            job.setFlel(new FileLoggingEventListener(job.getJob().getLogChannelId(),
                    getNewLogFile(job), true, job));
        }
    }

    /**
     * 获取作业新的日志文件 <br/>
     * @author jingma
     */
    public static File getNewLogFile(VJob job) {
        File logFile;
        logFile = new File(KettleManager.getLogFileRoot() + UtilConst.FXG
                + DateUtil.doFormatDate(new Date(), DateUtil.DATE_FORMATTER8));
        if (!logFile.exists()) {
            if (!logFile.mkdirs()) {
                throw new MyException("日志文件目录创建失败：" + logFile.getAbsolutePath());
            }
        }
        logFile = new File(logFile.getAbsolutePath() + UtilConst.FXG
                + job.getName() + "_" + DateUtil.doFormatDate(new Date(),
                "HHmmss") + ".txt");
        job.setRzwj(logFile);
        //生成日志文件时就插入日志记录，便于用户在运行中查询下载作业日志，因为作业管理只显示最近时间的实时日志
        Db.use(job.getZyk()).update(SqlId.of("kee", "insertLog"),
                Db.buildMap(job.getJcrzzj(), job.getIdJob(),
                        job.getName(), job.getZykssj(), logFile.getAbsolutePath(),Conf.getAppdm()));
        job.setZykssj(null);
        return logFile;
    }

    /**
     * 每个日志文件记录一条作业日志，用户可以根据时间区间选择要下载的日志。 <br/>
     * @author jingma
     * @param job 作业
     */
    public static void updateJoblog(VJob job) {
        String dqsj = DateUtil.getGabDate();
        Db.use(job.getZyk()).update(SqlId.of("kee", "updateLog"),
                Db.buildMap(dqsj, KettleManager.getJobStatus(job), job.getJcrzzj()));
    }

    /**
     * 获取主要异常消息 <br/>
     * @author jingma
     */
    public static String getExceptionMsg(String joblogStr, Matcher m) {
        if (joblogStr.length() <= 3000) {
            return joblogStr;
        }
        //没有匹配上
        if (!m.find()) {
            return joblogStr.substring(0, 3000);
        }
        if (m.start() <= 100) {
            return joblogStr.substring(0, 3000);
        } else if (joblogStr.length() - m.start() + 100 <= 3000) {
            return joblogStr.substring(m.start() - 100);
        } else {
            return joblogStr.substring(m.start() - 100, m.start() + 2900);
        }
    }

    /**
     * @return job
     */
    public VJob getJob() {
        return job;
    }

    /**
     * @param job the job to set
     */
    public void setJob(VJob job) {
        this.job = job;
    }

    ///////////////////////基本原监听器拷贝过来的//////////////////

    private String filename;

    private OutputStream outputStream;
    private KettleLogLayout layout;

    private KettleException exception;
    private String logChannelId;

    /**
     * Log all log lines to the specified file
     */
    public FileLoggingEventListener(String filename, boolean append) throws KettleException {
        this(null, filename, append);
    }

    /**
     * Log only lines belonging to the specified log channel ID or one of it's children (grandchildren) to the specified
     * file.
     */
    public FileLoggingEventListener(String logChannelId, String filename, boolean append) throws KettleException {
        this.logChannelId = logChannelId;
        this.filename = filename;
        this.layout = new KettleLogLayout(true);
        this.exception = null;

        FileObject file = KettleVFS.getFileObject(filename);
        outputStream = null;
        try {
            outputStream = KettleVFS.getOutputStream(file, append);
        } catch (Exception e) {
            throw new KettleException(
                    "Unable to create a logging event listener to write to file '" + filename + "'", e);
        }
    }

    public void writeLog(KettleLoggingEvent event) throws IOException {
        String logText = new Date() + layout.format(event);
        outputStream.write(logText.getBytes());
        outputStream.write(Const.CR.getBytes());
    }

    public void close() throws KettleException {
        try {
            if (outputStream != null) {
                outputStream.close();
            }
            closed = true;
        } catch (Exception e) {
            throw new KettleException("Unable to close output of file '" + filename + "'", e);
        }
    }

    public KettleException getException() {
        return exception;
    }

    public void setException(KettleException exception) {
        this.exception = exception;
    }

    public String getFilename() {
        return filename;
    }

    public void setFilename(String filename) {
        this.filename = filename;
    }

    public OutputStream getOutputStream() {
        return outputStream;
    }

    public void setOutputStream(OutputStream outputStream) {
        this.outputStream = outputStream;
    }

    /**
     * @return closed
     */
    public boolean isClosed() {
        return closed;
    }

    /**
     * @param closed the closed to set
     */
    public void setClosed(boolean closed) {
        this.closed = closed;
    }

    /**
     * @return lastupdate
     */
    public Date getLastupdate() {
        return lastupdate;
    }

    /**
     * @param lastupdate the lastupdate to set
     */
    public void setLastupdate(Date lastupdate) {
        this.lastupdate = lastupdate;
    }

}
